浅谈Async Generator

发布于 · 最后修改时间 · 标签: javascript cogitation

会议初学编程的时候,那时候学的还是asp.net,然后看到substrack一个演讲视频:Harnessing The Awesome Power Of Streams,觉得:“哇!流这种编程思想我要学!”,从此就开始入了nodejs的坑。然而直到现在2018年,async iterator的出现,才勉勉强强提供了一种语法层级的流编程的体验。

一开始,基于事件编程,开发者通过监听一个个事件(从on("data", fn)开始),来模拟事件流程。这勉强算是一种实现,它最大的问题是需要创建大量的callback,维护它的代价就是需要书写大量冗余的代码与抽象的封装才能勉强达到稳定可用的级别。
事情的转机从Generator的出现开始,与其并行推广的还有提早一阵子出现的Promise。大部分人对Generator的使用无非就是co这个库的骚操作。不过确实,单纯Generator这个语法特性,很难在jser里头引起什么大风浪,毕竟js里头基本都是异步编程,Generator除了模拟现在的async/await以外很难有大舞台。

Async Generator

有趣的在async iterator这个语法出台,Generator才算正式杀入js的异步编程。

入门

举个简单的例子:

const stream = fs.fs.createReadStream('./big-file');
for await(const data of stream){
  console.log(data)
}

这种写法相比on('data', fn)这种写法,最大的区别在于资源的控制与利用上:基于事件的监听,nodejs会尽可能也必须尽可能快速地去触发data事件,而并不知道你到底有多么需要这些data,反正它就是冲着榨干硬件资源的操作去无脑触发就是了(当然你也可以直接使用文件句柄来手控操作,也能规避这个问题)。

进阶

再举一个实践的例子,我最近在做的节点扫描:从一个节点扫描出N个节点,然后再从这N个节点中扫描下一级的节点,不断收集,直到满足需求为止,伪代码如下:

type Peer = { origin:string; level: number };
class PeerService {
  peerList = [{origin:'http://peer.com:777', level :1 }]; // 初始节点
  async *searchPeers(
    enter_port_peers = this.peerList, // 初始的节点
    collection_peers = new Map<string, Peer>(), // 节点去重用的表
    parallel_pool = new ParallelPool<Peer[]>(2), // 1. 并行池,可以同时执行2个任务
  ): AsyncIterableIterator<Peer> {
    const self = this; // Generator function 无法与箭头函数混用,所以这里的this必须主动声明在外部。
    /*递归搜索代码片段*/
    const recursiveSearch = async function*(skip_when_no_full?: boolean) {
      for await (const peers of parallel_pool.yieldResults({
        ignore_error: true, // 忽略错误(忽略不可用的节点)
        skip_when_no_full, // 在池子不填满的情况下是否返回
      })) {
        for (const peer of peers) {
          yield peer;// 3. 先返回节点,然后再递归搜索
          yield* self.searchPeers([peer], collection_peers, parallel_pool);
        }
      }
    };

for (const enter_port_peer of enter_port_peers) {
// 2. 向并行池中添加任务
parallel_pool.addTaskExecutor(() =>
this._searchPeers(enter_port_peer, collection_peers),
);
yield recursiveSearch(true);// 如果并行池满了,等待一个任务完成后再继续填充
}
yield recursiveSearch();
}
private async _searchPeers(
enter_port_peer: typeof PEERS[0],
collection_peers: Map<string, Peer>,
) {
// TODO: 使用API获取与之相连的节点集合,并使用collection_peers去重
return [] as Peer[];
}
}

以上代码中有三个重点:
1️⃣ 并行池的设计,是因为网络的数据请求应该尽可能预先加载,这里默认能同时有2数据请求在执行
2️⃣ 我在一开始根据一个已知的节点表,把所有的下载任务尽肯能添加到池子中
3️⃣ 每个任务返回的是一组节点,但以流的思想,拆解成一个个往外返回。外部拿到这个节点后,可以进行一些分析,比如试探这个节点是否正常,延迟多少等等。如果外部觉得节点够多了,这个生成器就不需要再执行下去。如果不够,那么就继续搜索下去。要注意的时候,在返回到外部的时候,因为我们使用了并行池的设计,所以理论上还有其它的一些网络请求任务也在执行。

当然,有这个并行池的设计,你要把上面的代码改成基于事件触发的也不是不行。无非是多写点代码,多创建一些函数,通过代码拼接来完成整个AsyncIterator肯定也是可以的。async Generator只是提供给我们一种更加直观的写法与思考方式。

未来

语法特性的增加,最大的好处就是减少了错误的发生。
这可能与其它一些语言的设计哲学并不相同,有的语言追求有且只有一种写法来实现需求。而js则是告诉你你这段代码还能简单+精简+可读+可维护+。
按现有的提案,管道操作符|>与函数部分执行fn(1, ?)这两个语法如果通过提案了,对于流编程也会带来很大的便利性。